-
Couldn't load subscription status.
- Fork 2.5k
Implement Request and Response Policy Based Routing in Cluster Mode #3422
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: load-balance-search-commands-to-shards
Are you sure you want to change the base?
Implement Request and Response Policy Based Routing in Cluster Mode #3422
Conversation
feat(routing): add internal request/response policy enums
* feat: load the policy table in cluster client * Remove comments
…or osscluster.go (#6) * centralize cluster command routing in osscluster_router.go and refactor osscluster.go * enalbe ci on all branches * Add debug prints * Add debug prints * FIX: deal with nil policy * FIX: fixing clusterClient process * chore(osscluster): simplify switch case * wip(command): ai generated clone method for commands * feat: implement response aggregator for Redis cluster commands * feat: implement response aggregator for Redis cluster commands * fix: solve concurrency errors * fix: solve concurrency errors * return MaxRedirects settings * remove locks from getCommandPolicy * Handle MOVED errors more robustly, remove cluster reloading at exectutions, ennsure better routing * Fix: supports Process hook test * Fix: remove response aggregation for single shard commands * Add more preformant type conversion for Cmd type * Add router logic into processPipeline --------- Co-authored-by: Nedyalko Dyakov <nedyalko.dyakov@gmail.com>
…ce-search-commands-to-shards
| } | ||
| if result.cmd != nil && result.err == nil { | ||
| // For MGET, extract individual values from the array result | ||
| if strings.ToLower(cmd.Name()) == "mget" { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually need this special case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @ofekshenawa
6e3b627 to
1b2eaa6
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Submitting another partial review.
| } | ||
|
|
||
| func (p *CommandPolicy) CanBeUsedInPipeline() bool { | ||
| return p.Request != ReqAllNodes && p.Request != ReqAllShards && p.Request != ReqMultiShard |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What about special? Can it be used in a pipeline?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that special should be handled on a case-by-case basis
| // ShardPicker chooses “one arbitrary shard” when the request_policy is | ||
| // ReqDefault and the command has no keys. | ||
| type ShardPicker interface { | ||
| Next(total int) int // returns an index in [0,total) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Those are great, can we implement StaticShardPicker or StickyShardPicker that will always return the same shard. I do think this can be helpful for testing. This is not a blocker by any means.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just add a todo for those, i think they can be useful in the future.
| return strconv.ParseBool(cmd.val) | ||
| return strconv.ParseBool(cmd.Val()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why was this change needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure, for consistency maybe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the more consistent thing would have been to keep it as cmd.val if it is not needed, feel free to return back to cmd.val
| if commandInfoTips != nil { | ||
| if v, ok := commandInfoTips[requestPolicy]; ok { | ||
| if p, err := routing.ParseRequestPolicy(v); err == nil { | ||
| req = p | ||
| } | ||
| } | ||
| if v, ok := commandInfoTips[responsePolicy]; ok { | ||
| if p, err := routing.ParseResponsePolicy(v); err == nil { | ||
| resp = p | ||
| } | ||
| } | ||
| } | ||
| tips := make(map[string]string, len(commandInfoTips)) | ||
| for k, v := range commandInfoTips { | ||
| if k == requestPolicy || k == responsePolicy { | ||
| continue | ||
| } | ||
| tips[k] = v | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we do both of those in a single range over commandInfoTips?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure that I completely understand the question
| return nil | ||
| } | ||
|
|
||
| func (cmd *IntPointerSliceCmd) Clone() Cmder { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's tricky here. do we need to return the same pointer or do we only want the value when cloning?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
still not sure where this type is used and if we would like the pointer of the value, cc @ofekshenawa , @htemelski-redis
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Final part of initial review
Overview:
- Let's use atomics when possible.
- Left questions related to the node selection and setting of values.
Overall the design of the solution looks good, would have to do an additional pass over the test files once this review is addressed.
Thank you both @ofekshenawa and @htemelski-redis!
| if c.hasKeys(cmd) { | ||
| // execute on key based shard | ||
| return node.Client.Process(ctx, cmd) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we know that this node servers the slot for the key?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the node should've been selected based on the slot osscluster.go:L1906
func (c *ClusterClient) cmdNode(
| // execute on key based shard | ||
| return node.Client.Process(ctx, cmd) | ||
| } | ||
| return c.executeOnArbitraryShard(ctx, cmd) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since it doesn't matter and there is already some node selected, why not use it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have two different ways of picking an arbitrary shard, either round robin or a random one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I understand that, but for some reason there is already a node selected here that may have been selected because MOVED OR normal key based selection. Why do we have to reselect the node? Shouldn't this selection of arbitrary node be done outside, so we do the node selection only one time and the node on line #52 is the one that should be used for this command?
| // Command executed successfully but value extraction failed | ||
| // This is common for complex commands like CLUSTER SLOTS | ||
| // The command already has its result set correctly, so just return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand that comment here. Why the value extraction returned nil? Can we make sure the cmd has value set at least? If it doesn't, we may return a cmd with nil value and nil error, which doesn't make sense.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
727a799 to
14bd6e1
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
left some comments related to aggregators
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The aggregators look good, there are some prints left in the code as bunch of unanswered questions. Let's resolve them before merging this. cc @ofekshenawa , @htemelski-redis
| // AggMaxAggregator returns the maximum numeric value from all shards. | ||
| type AggMaxAggregator struct { | ||
| err atomic.Value | ||
| res *util.AtomicMax | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
general question, are those min,max aggregators only for ints?
| // ShardPicker chooses “one arbitrary shard” when the request_policy is | ||
| // ReqDefault and the command has no keys. | ||
| type ShardPicker interface { | ||
| Next(total int) int // returns an index in [0,total) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we just add a todo for those, i think they can be useful in the future.
| return strconv.ParseBool(cmd.val) | ||
| return strconv.ParseBool(cmd.Val()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
well, the more consistent thing would have been to keep it as cmd.val if it is not needed, feel free to return back to cmd.val
| if commandInfoTips != nil { | ||
| if v, ok := commandInfoTips[requestPolicy]; ok { | ||
| if p, err := routing.ParseRequestPolicy(v); err == nil { | ||
| req = p | ||
| } | ||
| } | ||
| if v, ok := commandInfoTips[responsePolicy]; ok { | ||
| if p, err := routing.ParseResponsePolicy(v); err == nil { | ||
| resp = p | ||
| } | ||
| } | ||
| } | ||
| tips := make(map[string]string, len(commandInfoTips)) | ||
| for k, v := range commandInfoTips { | ||
| if k == requestPolicy || k == responsePolicy { | ||
| continue | ||
| } | ||
| tips[k] = v | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if commandInfoTips != nil { | |
| if v, ok := commandInfoTips[requestPolicy]; ok { | |
| if p, err := routing.ParseRequestPolicy(v); err == nil { | |
| req = p | |
| } | |
| } | |
| if v, ok := commandInfoTips[responsePolicy]; ok { | |
| if p, err := routing.ParseResponsePolicy(v); err == nil { | |
| resp = p | |
| } | |
| } | |
| } | |
| tips := make(map[string]string, len(commandInfoTips)) | |
| for k, v := range commandInfoTips { | |
| if k == requestPolicy || k == responsePolicy { | |
| continue | |
| } | |
| tips[k] = v | |
| } | |
| tips := make(map[string]string, len(commandInfoTips)) | |
| for k, v := range commandInfoTips { | |
| if k == requestPolicy { | |
| if p, err := routing.ParseRequestPolicy(v); err == nil { | |
| req = p | |
| } | |
| continue | |
| } | |
| if k == responsePolicy { | |
| if p, err := routing.ParseResponsePolicy(v); err == nil { | |
| resp = p | |
| } | |
| continue | |
| } | |
| tips[k] = v | |
| } | |
| // For MGET without policy, use keyed aggregator | ||
| if cmdName == "mget" { | ||
| return routing.NewDefaultAggregator(true) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| // Command executed successfully but value extraction failed | ||
| // This is common for complex commands like CLUSTER SLOTS | ||
| // The command already has its result set correctly, so just return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| func (c *ClusterClient) setCommandValueReflection(cmd Cmder, value interface{}) error { | ||
| cmdValue := reflect.ValueOf(cmd) | ||
| if cmdValue.Kind() != reflect.Ptr || cmdValue.IsNil() { | ||
| return fmt.Errorf("redis: invalid command pointer") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
here as well, preallocate
|
|
||
| defer func() { | ||
| if r := recover(); r != nil { | ||
| cmd.SetErr(fmt.Errorf("redis: failed to set command value: %v", r)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why don't we return the error as well? it will return nil, but the err will be set on the cmd.
| branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski/*', 'ce/*', '*'] | ||
| pull_request: | ||
| branches: [master, v9, v9.7, v9.8] | ||
| branches: [master, v9, v9.7, v9.8, 'ndyakov/*', 'ofekshenawa/*', 'htemelski/*', 'ce/*', '*'] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fetch master for those
This PR introduces support for Redis COMMAND-based request_policy and response_policy routing for Redis commands when used in OSS Cluster client.
Key Additions:
Command Policy Loader: Parses and caches COMMAND metadata with routing/aggregation tips on first use.
Routing Engine Enhancements:
Implements support for all request policies: default(keyless), default(hashslot), all_shards, all_nodes, multi_shard, and special.
Response Aggregator: Combines multi-shard replies based on response_policy:
all_succeeded, one_succeeded, agg_sum, special, etc.
Includes custom handling for special commands like FT.CURSOR.
Raw Command Support: Policies are enforced on Client.Do(ctx, args...).